-
-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add executor to zimscraperlib #211
base: main
Are you sure you want to change the base?
Conversation
b87c5b7
to
a080294
Compare
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #211 +/- ##
==========================================
Coverage 100.00% 100.00%
==========================================
Files 38 39 +1
Lines 2221 2327 +106
Branches 426 446 +20
==========================================
+ Hits 2221 2327 +106 ☔ View full report in Codecov by Sentry. |
3e08c87
to
0ce636c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM but it lacks proper documentation. It's important the expected behavior is clearly documented so the user can make informed choices
|
||
@property | ||
def exception(self): | ||
"""Exception raises in any thread, if any""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"""Exception raises in any thread, if any""" | |
"""Exception raised in any thread, if any""" |
self._workers: set[threading.Thread] = set() | ||
self.no_more = False | ||
self._shutdown = False | ||
self.exceptions[:] = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.exceptions[:] = [] | |
self.exceptions.clear() |
self._shutdown = False | ||
self.exceptions[:] = [] | ||
|
||
for n in range(self.nb_workers): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No single-letter variable
try: | ||
func(**kwargs) | ||
except Exception as exc: | ||
logger.error(f"Error processing {func} with {kwargs=}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger.error(f"Error processing {func} with {kwargs=}") | |
logger.error(f"Error processing function {func.__name__} with {kwargs=}") |
# received None from the queue. most likely shuting down | ||
return | ||
|
||
raises = kwargs.pop("raises") if "raises" in kwargs.keys() else False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
those things are now part of the submit API and should be documented there (in its docstring)
logger.error(f"Error processing {func} with {kwargs=}") | ||
logger.exception(exc) | ||
if raises: # to cover when raises = False | ||
self.exceptions.append(exc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this intended behavior? Exceptions are swallowed without a trace (when using raise=False
)? If so, this must be clearly documented. Not raising but storing the exception is another valid alternative.
except queue.Empty: | ||
break | ||
|
||
def join(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the recommended way to await an executor completion then?
With this, the deadline_sec
being mandatory, I can only use join
when I want to exit but in a scraper, I imagine the scenario being: I collect all my resources and submit them to the executor then I join and once everything has been processed (once join completes), I can continue.
This is the regular meaning of join
.
If we want it to act as a properly exit method, then the user has to track progress manually and this should be clearly specified in the documentation of the executor
while ( | ||
len(alive_threads) > 0 and datetime.datetime.now(tz=datetime.UTC) < deadline | ||
): | ||
e = threading.Event() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
single-letter var
Converting to draft, we are experimenting with joblib in mindtouch scraper for now |
This PR enrich the scraperlib with a
ScraperExecutor
. This class is capable to process tasks in parallel, with a given number of worker threads.This executor is mainly inspired from sotoki executor, even if we can find other executors in wikihow and in iFixit. wikihow one seems more primitive / ancient, and iFixit is just a pale copy.
For easy review, first commit is simply a copy/paste of sotoki code, and next commit are the adaptations / enhancement for scraperlib
What has been changed compared to sotoki code:
thread_deadline_sec
to the executor, should we need to customize it per executor (probably the case, priceless and useful for tests at least)if self.no_more:
insubmit
method: allows to stop accepting task even when the executor is justjoined
and notshutdown
prefix
toexecutor_name
and moved fromT-
toexecutor
(way more clear in the logs from my experience)release_halt
method which was misleading / not working (I failed tojoin
and thenrelease_halt
and thensubmit
again ... it seems mandatory tojoin
thenstart
(again) thensubmit
)thread_deadline_sec
seconds per thread. This is highly unpredictable when there are many workers (we could waitthread_deadline_sec
for first worker, thenthread_deadline_sec
for second worker, etc ...), and it is a bit weird that last worker in the list has way more time to complete than first oneThis executor will be used right now in mindtouch scraper.